[Tracking] feat(contrib): Native Delta Lake scan via delta-kernel-rs (Iceberg-style contrib)#4366
[Tracking] feat(contrib): Native Delta Lake scan via delta-kernel-rs (Iceberg-style contrib)#4366schenksj wants to merge 110 commits into
Conversation
…e hardening) Addresses the 8 findings from the independent code review (see PR apache#4366 comments). 49/49 contrib tests still pass on BOTH Spark 4.1 + Delta 4.1.0 and Spark 3.5 + Delta 3.3.2 after these changes. Critical: 1. native/shuffle/src/spark_unsafe/unsafe_object.rs: replace `from_utf8_unchecked` with `from_utf8_lossy` returning `Cow<'_, str>`. The previous version constructed a `&str` from arbitrary bytes (Spark's binary-cast-to-string case, e.g. Delta's Z-Order `interleave_bits(...).cast(StringType)`) -- the Rust reference defines that as UB even when the bytes only get copied downstream, because downstream Arrow ops internally use `str::from_utf8_unchecked` on the StringArray buffer and would propagate the UB. `from_utf8_lossy` is well-defined: zero-cost borrow for valid UTF-8, allocates a String with U+FFFD replacements for invalid bytes (only fires on the binary-cast case, which Spark never displays as text anyway). All call sites pass to `StringBuilder::append_value` which takes `AsRef<str>`; `Cow<str>`'s `AsRef<str>` impl makes them work transparently. No call-site changes. 2. DeltaIntegration.scala: narrow the `case _: Exception => None` swallow in `transformV1IfDelta` to ONLY catch true reflection binding failures (`NoSuchMethodException`/`NoSuchFieldException`/ `IllegalAccessException`) and invocation errors (`IllegalAccessException`/`IllegalArgumentException`). An `InvocationTargetException` -- the contrib's transform actually threw -- now log-warnings and declines instead of silently falling back to vanilla. Without this, kernel-rs IO errors, CCE on a Delta version bump, NPE in the CM-id translator etc. would silently decline and the user would never know. Same narrowing applied to `scanHandler` and `DeltaPlanDataInjector` lookup (operators.scala). Should-fix: 3. CometExecRDD.compute: don't set InputFileBlockHolder when a partition has multiple files. Previous code took `partition.filePaths.head` always, which would silently report the first file's path for every row when a contrib accidentally batched multiple files in one partition. (Tried `require(length == 1)` first; that's too strict because partitioned reads legitimately have multi-file partitions but don't query `input_file_name()`. Skipping the hook on multi-file partitions preserves correctness for `input_file_name()` callers -- which MUST one-task-per-partition anyway -- without false-positive failing legitimate partitioned reads.) 4. engine.rs: LRU-bound the engine cache at MAX_CACHE_ENTRIES=32. The cache key included `DeltaStorageConfig` which contains `aws_session_token`; long-running drivers with rotating STS/IRSA credentials would grow one entry per rotation and LEAK one `TokioBackgroundExecutor` thread per stale entry. With LRU eviction, `Arc<DeltaEngine>` drops on eviction, `DefaultEngine` drops its `TokioBackgroundExecutor`, the OS thread joins, thread count stabilizes. Test `get_or_create_engine_evicts_lru_when_full` verifies the bound + eviction order. Nits: 5. planner.rs: error message for the "DeltaScan in default build" case now mentions BOTH `-Pcontrib-delta` (Maven) and `--features contrib-delta` (Cargo) -- previously mentioned only the Cargo flag. 6. dev/verify-contrib-delta-gate.sh: also assert the contrib-enabled libcomet has >0 Delta-related external symbols. Without this, a future Rust toolchain change that mangles symbol names differently would silently turn the default-build symbol check into a no-op while still passing -- the gate would lie about being enforced. Asserting both "default has 0" AND "contrib has >0" catches grep pattern drift. Build infrastructure: 7. pom.xml + spark/pom.xml: move `<delta.version>` default to the parent POM's top-level properties. Per-Spark-profile `delta.version` overrides cleanly (spark-3.5 -> 3.3.2, spark-4.1 -> 4.1.0), and spotless-style invocations without a Spark profile still resolve the property. The previous arrangement (default in `contrib-delta` profile) had Spark-profile overrides silently lose to the contrib-delta default because of POM profile-document-order property precedence. 8. Make `PlanDataInjector` and `DeltaIntegration` extend `org.apache.spark.internal.Logging` so the new `logWarning` calls compile. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
A field of a NULL struct must be NULL (Spark semantics). Arrow stores a StructArray's child arrays with their own validity, INDEPENDENT of the parent struct's null buffer, so the raw child value at a row where the struct itself is null can be non-null (e.g. parquet files where a logically-null struct column still carries a populated child buffer). GetStructField.evaluate returned the child column verbatim, so isnotnull(struct.field) wrongly evaluated TRUE for a null struct. Fix: union the parent struct's null mask into the extracted child (null where the struct is null OR the child is null). Adds a standalone unit test that fails without the fix and passes with it. Closes apache#4432 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Spark's UnsafeRow.getUTF8String performs no UTF-8 validation, and cast(BinaryType -> StringType) is a zero-copy reinterpret, so a StringType column can legitimately hold arbitrary non-UTF-8 bytes. get_string decoded with from_utf8(..).unwrap(), which panics on such rows even though Spark treats them as opaque. Use from_utf8_lossy (returning Cow<str>): a zero-cost borrow for valid UTF-8 and a String with U+FFFD replacements otherwise -- defined behavior, no UB. Avoids from_utf8_unchecked, which would construct a &str from arbitrary bytes (UB) and propagate into downstream Arrow ops. Adds a standalone unit test that panics without the fix and passes with it. Closes apache#4521 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…chemes Comet's native readers go through object_store, which only understands a fixed set of URL schemes. A custom Hadoop FileSystem (e.g. registered via spark.hadoop.fs.<scheme>.impl) crashes the native reader at execution with "Generic URL error: Unable to recognise URL", with no graceful recovery. Decline such scans at planning time so Spark's Hadoop-FS-aware reader handles them. Whether object_store recognizes a scheme is answered by the native layer itself (NativeBase.isObjectStoreSchemeSupported, backed by object_store's ObjectStoreScheme::parse -- the same path prepare_object_store_with_configs uses) rather than a hardcoded list, so the planner can't drift from object_store's actual support. The user's libhdfs scheme config (spark.hadoop.fs.comet.libhdfs.schemes) is unioned in on the JVM side; results are cached per scheme; if native can't be consulted the scheme is assumed supported rather than over-restricting. Adds CometScanSchemeFallbackSuite, which asserts a `fake://` scan falls back to Spark; it fails without the gate (Comet claims the scan) and passes with it. Closes apache#4520 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
A left-deep chain of N associative boolean operands serializes to a proto nested N levels deep. With N > protobuf's default recursion limit (100), the message overflows when the serialized plan is re-parsed -- on the JVM via Operator.parseFrom (findShuffleScanIndices / explain) and in the Rust prost decoder -- failing an otherwise-supported query. Comet evaluates AND/OR vectorially (both sides always evaluated, no row-level short-circuit), so the chains are fully associative. Flatten each chain and rebuild it as a balanced O(log n) tree before serialization; this is semantically identical and only changes the proto's shape. Adds QueryPlanSerde.flattenAssociative + createBalancedBinaryExpr and routes CometAnd / CometOr through them. Closes apache#4526 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Spark wraps file-source partition columns and other per-batch constants in ConstantColumnVector. When such a batch reaches Comet's serialization path (Utils.getBatchFieldVectors, used by broadcast/shuffle) or FFI export path (NativeUtil.exportBatch), it was rejected with "Comet execution only takes Arrow Arrays". Materialize the constant into a fresh Arrow FieldVector (the constant repeated numRows times) inline. The materializer reuses the existing per-type ArrowFieldWriters, so it covers every type -- scalars, decimal, timestamps, and complex struct/array/map -- and stays in sync with Spark's type handling. Adds ConstantColumnVectors.materialize (arrow package) + Utils.materializeConstantColumnVector, with new match arms in getBatchFieldVectors and exportBatch. Closes apache#4527 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
DataFusion's make_array asserts strict element-type equality in MutableArrayData and panics on a mismatch. Spark's CreateArray coerces element types with `sameType`, which ignores nullability, so children that share a surface type but differ only in a nested struct field's nullability get no unifying cast (e.g. array(struct(a not null), struct(a nullable))). Native execution then panics: "Arrays with inconsistent types passed to MutableArrayData". DataFusion tolerates container nullability differences (ArrayType.containsNull / MapType.valueContainsNull are coerced), so decline only the cases that actually panic: children that still differ after normalizing container nullability while keeping struct field nullability significant. Those fall back to Spark's evaluator. Closes apache#4528 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
PlanDataInjector.injectPlanData walked every operator in the tree against every registered injector (`for (injector <- injectors if injector.canInject(op))`) -- N operators x M injectors canInject calls -- even though most operators in any tree are non-scan and match no injector. Add `opStructCase` to the PlanDataInjector trait and key a Map[OpStructCase, PlanDataInjector]. Look up by op.getOpStructCase (O(1)) then a single canInject confirm; non-scan operators skip the iteration entirely. Pure performance change -- no behavior difference. Closes apache#4530 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
When Comet's native DataFusion scan hits a corrupt footer, corrupt page/column
data, a truncated/empty file, or a deleted file, it rethrew the raw native
message instead of Spark's FAILED_READ_FILE. The native path does not go through
Spark's FileScanRDD, so the offending path was usually missing too.
Classify these failures by TYPED DataFusionError variant in the native error
path (ParquetError / ObjectStore / ArrowError-wrapping-ParquetError / IoError,
unwrapping Context/Shared) rather than by matching error-message prose -- the
strings come from three upstream crates (DataFusion, arrow-rs, object_store) and
drift across version bumps with no compile-time signal. The match arms are
checked by the compiler.
- native: new SparkError::CannotReadFile { file_path, message } variant; a typed
try_classify_file_read_error in the JNI bridge converts a file-read
DataFusionError into it, replacing the previous "not found"/"No such file"
string match. file_path is taken from object_store::Error::NotFound when
available. Deliberately does NOT match object_store Generic errors (also used
for non-file config errors that must surface as-is).
- JVM: the structured error crosses JNI as the existing CometQueryExecutionException
JSON payload; SparkErrorConverter decodes "CannotReadFile" and, when the native
error carried no path, fills it from the per-task file list threaded from
CometNativeScanExec via CometExecRDD. The shims wrap it via
QueryExecutionErrors.cannotReadFilesError. No JVM-side message matching.
Closes apache#4529
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
check-suites.py requires every *Suite.scala to appear in both pr_build_linux.yml and pr_build_macos.yml. Add the new PlanDataInjectorSuite alongside its sibling org.apache.spark.sql.comet suites. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
check-suites.py requires every *Suite.scala to appear in both pr_build_linux.yml and pr_build_macos.yml. Add the new CometScanSchemeFallbackSuite alongside its sibling org.apache.comet.rules suites. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…la 2.12 SQLTestUtils.withSQLConf returns Unit on Spark 3.5 but a value on Spark 4.x, so assigning its block result to `val sparkPlan: SparkPlan` failed to compile under the spark-3.5 profile (type mismatch: found Unit, required SparkPlan). Capture the plan via a var assigned inside the block, which is cross-version-safe. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Address review feedback: import java.lang.Boolean (as JBoolean), java.net.URI, java.util.Locale and java.util.concurrent.ConcurrentHashMap rather than referencing them with fully-qualified class names in the newly-added scheme-gating code. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…R WHERE Address review feedback on the deep-chain rebalancing PR: - flattenAssociative now uses an explicit work stack and an accumulating buffer instead of recursion. The chains that trigger this are left-deep and O(n) deep, so the prior recursive walk could itself overflow the JVM stack and the `++` accumulation was O(n^2). - The recursion-limit test now mixes a nullable column into the chains so the rebalanced tree is exercised under SQL three-valued logic, and adds a deep OR in a WHERE clause -- a common trigger that, unlike a top-level AND, Spark does not split and so stays deeply nested. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…che#4521) Address review feedback: add a Spark-level regression test demonstrating the bug. cast(binary -> string) is a zero-copy reinterpret in Spark, so a StringType column can hold arbitrary non-UTF-8 bytes. The test disables Comet's Cast so those raw bytes reach Comet's columnar (JVM) shuffle inside a JVM UnsafeRow, exercising the native row->Arrow get_string path that used to panic via from_utf8(..).unwrap() and now decodes lossily. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…rename) The unsupported-scheme fallback still called withInfo, the old name of withFallbackReason (renamed in apache#4508). It was the only remaining old-name call in the file and broke compilation after merging main; rename it to match the rest of CometScanRule. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… nullable GetStructField::nullable() reported only the extracted field's own nullability, ignoring whether the parent struct can be null. A field of a null struct is null (Spark semantics, enforced here by project_field unioning the parent null mask), so a NON-nullable field of a NULLABLE struct must itself be reported nullable. Reporting the field's own flag under-declares: the projected column then carries the parent's nulls while claiming non-nullable, and Arrow RecordBatch validation rejects it downstream with "Column '...' is declared as non-nullable but contains null values" (e.g. once the column reaches a shuffle read-back or a projection over a final aggregate). This is the companion to the value-side null-mask propagation in this PR -- the value is now correct, this makes the declared nullability match. Mirrors Spark's GetStructField.nullable = child.nullable || field.nullable. Surfaced by Delta's action frame: each log row is exactly one action type, so the action columns (add, remove, ...) are nullable structs whose inner fields are declared NON-nullable by Delta's typed SingleAction schema (e.g. add.size). The non-AddFile rows leave add null, so add.size carries nulls while declared non-nullable, crashing Comet's native shuffle during OPTIMIZE / commit. Tests: - Rust unit tests for the nullability matrix (nullable/non-nullable parent x field). - A Spark repro in CometExpressionSuite that builds that exact shape with an explicit in-memory schema (a Parquet round-trip would mark every field nullable, and a CreateNamedStruct would be declined), shuffles it, and projects the non-nullable inner field. It fails with the above error before this fix and passes after. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…Error), not CannotReadFile
A genuinely-missing file (object_store NotFound) is distinct from a corrupt/truncated one:
Spark surfaces it via `readCurrentFileNotFoundError` ("It is possible the underlying files
have been updated."), not `cannotReadFilesError` (FAILED_READ_FILE). `try_classify_file_read_error`
mapped every per-file read failure -- including NotFound -- to `SparkError::CannotReadFile`, so a
file removed between planning and execution produced the wrong Spark error.
Classify object_store NotFound as `SparkError::FileNotFound` instead. The NotFound may arrive
directly (`DataFusionError::ObjectStore`) or wrapped by the parquet reader as
`ParquetError::External(..)` / `ArrowError::ParquetError`, so a `source_chain_has_object_store_not_found`
helper walks the typed source chain (never message text). Corrupt/truncated reads stay
CannotReadFile -> FAILED_READ_FILE. The JVM shim already maps the `FileNotFound` errorType to
`readCurrentFileNotFoundError`, so no shim change is needed.
Surfaced by Delta's CDC-after-VACUUM read: `DeltaVacuumSuite` "vacuum for cdc - update/merge" and
"... - delete tombstones" vacuum the `_change_data` files and assert the subsequent read throws
`readCurrentFileNotFoundError`; with the native scan these failed because Comet returned the
cannotReadFilesError message. Both pass with this fix (verified locally).
Tests:
- Rust unit tests for the classifier: object_store NotFound (direct and ParquetError::External-wrapped)
-> FileNotFound; corrupt ParquetError stays CannotReadFile.
- Spark `CometExecSuite` "native parquet read of a missing file surfaces readCurrentFileNotFoundError"
(red before, green after): reads a file deleted between planning and execution.
- Made the existing FAILED_READ_FILE corrupt-file assertion spark-version-stable (assert
"Encountered error while reading file" -- present on both 3.5 and 4.x; only 4.x prepends the
[FAILED_READ_FILE.NO_HINT] class tag), so the test passes under -Pspark-3.5 as well.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ons, IoError scoping Addresses @andygrove's review on apache#4536: - spark-3.4 shim: add the `CannotReadFile` case (it only existed in the 3.5 and 4.x shims), so a corrupt/truncated read is wrapped via `cannotReadFilesError` (FAILED_READ_FILE) on Spark 3.4 too. (The `FileNotFound` case was already present on 3.4.) - SparkErrorConverterSuite: assert on the version-stable message ("Encountered error while reading file ...") instead of the `FAILED_READ_FILE` literal, which only Spark 4.x prepends to getMessage as the error-class tag (3.4/3.5 render only the message). Fixes the two failing tests on 3.4/3.5; same version-stable style already applied to the CometExecSuite e2e test. - native classifier: stop treating a bare `DataFusionError::IoError` as a file read. Scans surface read failures as a typed ParquetError/ObjectStore error; a bare IoError can also come from non-scan paths (spill, shuffle), which must not be relabelled FAILED_READ_FILE with a per-task path attached. Test updated accordingly. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ow path Follow-up to @andygrove's review on apache#4536: - (point 3, wording) parquet-rs reports a bad magic / unreadable footer as "Invalid Parquet file. Corrupt footer", whereas Spark's reader -- and Spark's `ParquetQuerySuite` ("ignoreCorruptFiles", "ignoreMissingFiles using parquet") -- phrase it as "<file> is not a Parquet file". `cannot_read_file_message` now appends Spark's phrasing for the magic/footer case so the FAILED_READ_FILE cause carries it. The outer `cannotReadFilesError` wrapper ("Encountered error while reading file …") is unchanged, so this composes with Spark's tests and does not disturb the Delta shims that match Comet's outer message. Other read failures keep their native message. (On behavior: the native scan already declines and falls back to Spark when `spark.sql.files.ignoreCorruptFiles`/`ignoreMissingFiles` is enabled -- CometNativeScan.scala -- so the skip semantics are preserved; no behavior gap.) - (point 5, tidy) `try_classify_file_read_error` is no longer evaluated twice (`.is_some()` guard + `.unwrap()`): the DataFusion arm is a single `if let Some(..)`, and the generic fallback is extracted to `throw_generic_exception`. Tests: classifier unit tests for the magic/footer wording (added) vs other parquet errors (unchanged native message). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Replace `String::from_utf8_lossy` in `get_string` with `decode_utf8_spark_lossy`, which mirrors `sun.nio.cs.UTF_8.Decoder` (action REPLACE) byte-for-byte so a Comet columnar shuffle of arbitrary bytes renders identically to a Spark JVM shuffle. `from_utf8_lossy` follows the Unicode "maximal subpart" rule and can emit more than one U+FFFD per ill-formed multi-byte unit; the JDK collapses certain units (notably surrogate-range three-byte sequences `ED A0..BF ..`, e.g. CESU-8 / modified-UTF-8 supplementary chars) into a single U+FFFD. Valid UTF-8 still returns a zero-cost borrow via the fast path. Tests use JDK-17 `new String(bytes, UTF_8)` output as the oracle: a 7-case replacement-granularity table (incl. the `ED A0 80` -> single U+FFFD parity case), zero-copy borrow for valid UTF-8, and valid multibyte chars preserved around an invalid byte. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Address review feedback on apache#4525. When `spark.hadoop.fs.comet.libhdfs.schemes` is unset, the scheme gate now defaults `libhdfsSchemes` to `Set("hdfs")` rather than the empty set, mirroring the native default: `is_hdfs_scheme` (parquet_support.rs) treats `hdfs` as natively readable when the config is unset, and `create_hdfs_object_store` is in the default build (`default = ["hdfs-opendal"]`). Previously a plain `hdfs://` V1 scan was declined and silently fell back to Spark in the default HDFS configuration even though native could read it. `s3a`/`file` are unaffected (object_store recognizes them via `parse_url`); an explicit config value still takes over verbatim. Test: add `native scan claims hdfs:// when libhdfs.schemes is unset` to CometScanSchemeFallbackSuite, alongside the existing `fake://` decline case. It backs the `hdfs` scheme with a local FS (FakeHdfsSchemeFileSystem) so an `hdfs://` path is exercised without a live cluster, then asserts CometScanRule claims the scan. Verified RED (fails with `Set.empty`: scan falls back to Spark) -> GREEN (passes with `Set("hdfs")`) on Spark 3.5. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…into delta-integration-base-v2
…delta-integration-base-v2
|
@andygrove FYI... plan for review chunks is in the description of this PR now. will keep this PR in place so reviewers can reference the "full" work product if there are questions about the current status/features/test coverage/etc... as the feature is feathered in. Also tagging in @mbutrovich per @andygrove Please let me know if this chunking plan makes sense or if there is anything you'd like me to rework. |
# Conflicts: # .gitignore
…gnore conflict
Conflict resolutions: - operator.proto: upstream added BroadcastNestedLoopJoin=117 (collides with Delta's delta_scan); kept BNLJ=117, moved DeltaScan to 118. Docs updated. - operator_registry.rs / jni_api.rs: kept both new match arms (exhaustive over the regenerated OpStruct enum). - operators.scala: upstream refactored CometExecRDD construction into buildNativeContext() /NativeExecContext + executeColumnarWithContext(ctx). Re-threaded Delta's perPartitionFilePaths through NativeExecContext (new field, computed in buildNativeContext, passed to the RDD ctor) so per-file FAILED_READ_FILE.NO_HINT reporting survives the refactor. - CometArrayExpressionSuite.scala: kept both new tests (nullability-divergent CreateArray decline + ansi GetArrayItem-on-null-split). - Cargo.lock: took upstream's lock; cargo re-added Delta deps (consistent under --locked). Verified: native 'cargo check' (default) and spark 'test-compile' both green.
Introduce a small extension contract so out-of-tree Comet contrib leaf scans (Delta, and future Hudi/etc.) can participate in native planning without core holding a compile-time reference to them -- mirroring the Iceberg-precedent of keeping the data-source-specific code at the edge. What this adds: - `trait CometScanWithPlanData` (`sourceKey` / `commonData` / `perPartitionData`, plus optional `dynamicPruningFilters` / `withDynamicPruningFilters` for scans whose DPP filters live in a @transient field). `CometNativeScanExec` now mixes it in. - `CometNativeExec.foreachUntilCometInput` matches `case _: CometLeafExec` (a strict superset of the previous fixed scan enumeration -- all built-in leaf scans already extend `CometLeafExec`), so any leaf Comet exec is recognised as an input boundary. - `PlanDataInjector.findAllPlanData` collects per-partition planning data via the trait instead of a hardcoded `CometNativeScanExec` match. - `PlanDataInjector`'s registry gains one reflective `DeltaPlanDataInjector$` slot, appended only when a contrib bundled it (`-Pcontrib-delta`). Default builds get a `ClassNotFoundException` -> `None` and an unchanged injectors list, so there is zero contrib surface at runtime. - `CometPlanAdaptiveDynamicPruningFilters` rewrites AQE DPP filters in place for trait scans whose filters can't survive `makeCopy` (apache#3510). Inert by construction: with no contrib on the classpath this is behavior- preserving (the leaf match is a superset; the trait match catches the same `CometNativeScanExec`; the reflective slot resolves to nothing). Tests: `CometScanWithPlanDataSuite` (trait-contract defaults + reflective-slot graceful absence). Verified `CometJoinSuite` (native scan fusion / DPP) stays green. First unit of the Delta-contrib PR split (tracking: apache#4366).
Introduce a small extension contract so out-of-tree Comet contrib leaf scans (Delta, and future Hudi/etc.) can participate in native planning without core holding a compile-time reference to them -- mirroring the Iceberg-precedent of keeping the data-source-specific code at the edge. What this adds: - `trait CometScanWithPlanData` (`sourceKey` / `commonData` / `perPartitionData`, plus optional `dynamicPruningFilters` / `withDynamicPruningFilters` for scans whose DPP filters live in a @transient field). `CometNativeScanExec` now mixes it in. - `CometNativeExec.foreachUntilCometInput` matches `case _: CometLeafExec` (a strict superset of the previous fixed scan enumeration -- all built-in leaf scans already extend `CometLeafExec`), so any leaf Comet exec is recognised as an input boundary. - `PlanDataInjector.findAllPlanData` collects per-partition planning data via the trait instead of a hardcoded `CometNativeScanExec` match. - `PlanDataInjector`'s registry gains one reflective `DeltaPlanDataInjector$` slot, appended only when a contrib bundled it (`-Pcontrib-delta`). Default builds get a `ClassNotFoundException` -> `None` and an unchanged injectors list, so there is zero contrib surface at runtime. - `CometPlanAdaptiveDynamicPruningFilters` rewrites AQE DPP filters in place for trait scans whose filters can't survive `makeCopy` (apache#3510). Inert by construction: with no contrib on the classpath this is behavior- preserving (the leaf match is a superset; the trait match catches the same `CometNativeScanExec`; the reflective slot resolves to nothing). Tests: `CometScanWithPlanDataSuite` (trait-contract defaults + reflective-slot graceful absence). Verified `CometJoinSuite` (native scan fusion / DPP) stays green. First unit of the Delta-contrib PR split (tracking: apache#4366).
|
Thanks again for the work on this @schenksj and for helping us with reviews by splitting out some parts into smaller PRs. Really appreciate it! I plan to start reviewing this PR as well next week. As I've mentioned before, I am supportive of merging this as an experimental feature gated behind a config to allow others to test it out. I think the main concern from the core maintainers is just ensuring that this work doesn't have any negative impact on other planned work, so I'll mostly be reviewing from that point of view. |
|
Thanks Andy. I’ll start peeling off the Smaller PRs. I’ll get my friend Claude working on the breakup. It shouldn’t conflict with in-progress things since the build gate only compiles in a tiny amount of code when disabled |
# Conflicts: # native/shuffle/src/spark_unsafe/unsafe_object.rs # spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala # spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
Introduce a small extension contract so out-of-tree Comet contrib leaf scans (Delta, and future Hudi/etc.) can participate in native planning without core holding a compile-time reference to them -- mirroring the Iceberg-precedent of keeping the data-source-specific code at the edge. What this adds: - `trait CometScanWithPlanData` (`sourceKey` / `commonData` / `perPartitionData`, plus optional `dynamicPruningFilters` / `withDynamicPruningFilters` for scans whose DPP filters live in a @transient field). `CometNativeScanExec` now mixes it in. - `CometNativeExec.foreachUntilCometInput` matches `case _: CometLeafExec` (a strict superset of the previous fixed scan enumeration -- all built-in leaf scans already extend `CometLeafExec`), so any leaf Comet exec is recognised as an input boundary. - `PlanDataInjector.findAllPlanData` collects per-partition planning data via the trait instead of a hardcoded `CometNativeScanExec` match. - `PlanDataInjector`'s registry gains one reflective `DeltaPlanDataInjector$` slot, appended only when a contrib bundled it (`-Pcontrib-delta`). Default builds get a `ClassNotFoundException` -> `None` and an unchanged injectors list, so there is zero contrib surface at runtime. - `CometPlanAdaptiveDynamicPruningFilters` rewrites AQE DPP filters in place for trait scans whose filters can't survive `makeCopy` (apache#3510). Inert by construction: with no contrib on the classpath this is behavior- preserving (the leaf match is a superset; the trait match catches the same `CometNativeScanExec`; the reflective slot resolves to nothing). Tests: `CometScanWithPlanDataSuite` (trait-contract defaults + reflective-slot graceful absence). Verified `CometJoinSuite` (native scan fusion / DPP) stays green. First unit of the Delta-contrib PR split (tracking: apache#4366).
Introduce a small extension contract so out-of-tree Comet contrib leaf scans (Delta, and future Hudi/etc.) can participate in native planning without core holding a compile-time reference to them -- mirroring the Iceberg-precedent of keeping the data-source-specific code at the edge. What this adds: - `trait CometScanWithPlanData` (`sourceKey` / `commonData` / `perPartitionData`, plus optional `dynamicPruningFilters` / `withDynamicPruningFilters` for scans whose DPP filters live in a @transient field). `CometNativeScanExec` now mixes it in. - `CometNativeExec.foreachUntilCometInput` matches `case _: CometLeafExec` (a strict superset of the previous fixed scan enumeration -- all built-in leaf scans already extend `CometLeafExec`), so any leaf Comet exec is recognised as an input boundary. - `PlanDataInjector.findAllPlanData` collects per-partition planning data via the trait instead of a hardcoded `CometNativeScanExec` match. - `PlanDataInjector`'s registry gains one reflective `DeltaPlanDataInjector$` slot, appended only when a contrib bundled it (`-Pcontrib-delta`). Default builds get a `ClassNotFoundException` -> `None` and an unchanged injectors list, so there is zero contrib surface at runtime. - `CometPlanAdaptiveDynamicPruningFilters` rewrites AQE DPP filters in place for trait scans whose filters can't survive `makeCopy` (apache#3510). Inert by construction: with no contrib on the classpath this is behavior- preserving (the leaf match is a superset; the trait match catches the same `CometNativeScanExec`; the reflective slot resolves to nothing). Tests: `CometScanWithPlanDataSuite` (trait-contract defaults + reflective-slot graceful absence). Verified `CometJoinSuite` (native scan fusion / DPP) stays green. First unit of the Delta-contrib PR split (tracking: apache#4366).
|
@andygrove — starting the breakup of this PR into small, independently-reviewable pieces. They form a stacked chain (each part builds on the previous); every part is carved, fully verified, and reviewed clean. Part 1 is open here upstream; the rest are staged as review drafts on my fork and will be opened here in dependency order as each base merges to
Sizes, dependencies, and status are tracked in the PR description table above. |
…b split, part 2] Part 2 of the Delta Lake contrib PR breakup (tracking: apache#4366). Establishes the `contrib-delta` build gate and the inert wiring that lets a gated build compile end to end, while the DEFAULT build stays byte-for-byte unchanged (zero Delta surface). No real Delta read logic yet -- that lands in later parts; here a Delta read that reaches native returns a clean "not implemented" error and falls back to vanilla Spark. Build gate: - Maven `contrib-delta` profile (spark/pom.xml) with per-Spark `delta.version` (3.5->3.3.2, 4.0->4.0.0, 4.1->4.1.0) and an add-source of contrib/delta/src. Default `delta.version` floor in pom.xml. The default spark.version stays 4.1.2 (the delta-spark 4.1.1 pin is a separate, deferred decision). - Cargo `contrib-delta` feature on core (optional path dep on comet-contrib-delta); `native/Cargo.toml` excludes ../contrib from the workspace. - `dev/verify-contrib-delta-gate.sh` proves default cargo/mvn/dylib carry zero Delta surface and the gated build pulls the right deps; wired into a minimal `delta_build_gate.yml` CI job (the full suite/regression workflows land later). Hardened the script against a `set -o pipefail` + `grep -q` SIGPIPE misfire (early grep exit -> echo SIGPIPE -> false guard failure) via here-strings. Inert wiring: - Proto: `Delta*` messages + `delta_scan = 118` (117 is BroadcastNestedLoopJoin). - Native dispatch: `OpStruct::DeltaScan` arm with a not-compiled-in error on default builds and a feature-gated `delta_scan` shim that calls the contrib; exhaustive-match arms in operator_registry/jni_api; `convert_spark_types_to_ arrow_schema` promoted to pub(crate). - Stub contrib crate (contrib/delta/native): `plan_delta_scan` returns `DataFusionError::NotImplemented` -- just enough to satisfy the core shim's contract so `--features contrib-delta` links. - JVM bridge `DeltaIntegration` (reflective, all lookups return None until the contrib classes exist), the CometExecRule Delta-marker hook (CDF hook deferred to a later part), the CometScanRule Delta delegation + metadata-col reorder, and the leaf `DeltaConf`. Verification: default + gated native build, clippy both feature states, gate script, gated + default JVM compile, spotless/scalastyle, cargo fmt -- all green. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… + JNI) [Delta contrib split, part 3a] Part 3a of the Delta Lake contrib PR breakup (tracking: apache#4366). Replaces the build-gate stub crate's deps with the real driver-side modules and the delta-kernel-rs dependency, while the executor-side read path stays deferred (the `planner` stub still returns NotImplemented until part 3b adds `kernel_scan`/`dv_reader` + the real planner). Driver side (open table, replay log, push predicates, return a DeltaScanTaskList over JNI): - `error.rs` - DeltaError / DeltaResult. - `engine.rs` - delta-kernel engine + object_store config (S3/Azure/GCS/local) for log replay. - `predicate.rs` - Catalyst -> kernel predicate translation for file skipping. - `scan.rs` - log replay -> DeltaFileEntry/DeltaScanPlan; scan-task assembly. - `jni.rs` - `Native_planDeltaScan` / `Native_planDeltaReadSchemas` JNI entry points (the JVM `Native.scala` that calls them lands in part 4b). - `lib.rs` - declares the driver modules + keeps the `planner` stub; drops the `dv_reader`/`kernel_scan` module decls (part 3b). - `Cargo.toml` - only the deps the driver set uses (delta_kernel, object_store, arrow, jni, prost, serde_json, url, thiserror, log + jni-bridge); executor deps arrive in 3b. The driver set is self-contained (`jni -> scan -> engine -> error`, `predicate` standalone); nothing references the deferred modules. Core is untouched -- the dispatch shim still calls `planner::plan_delta_scan` (the stub) so a Delta read falls back to vanilla Spark until 3b. The gate-verify cargo-tree assertion is re-tightened to require `delta_kernel` (now real). Verification: gated native build, 54 in-crate unit tests (cargo test), default native build unchanged, clippy (both feature states), gate-verify script, cargo fmt -- all green. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…s) [Delta contrib split, part 3b] Part 3b of the Delta Lake contrib PR breakup (tracking: apache#4366). Completes the contrib native crate: the executor-side read path replaces the build-gate stub planner, so a `-Pcontrib-delta` build now does end-to-end native Delta reads (given a scan task, read through delta-kernel-rs, apply the transform + deletion vectors). - `planner.rs` - replaces the stub: assembles the per-task `DataSourceExec` (parquet scan + partition values + DV filter), wired to the core dispatch shim's `plan_delta_scan` call. - `kernel_scan.rs` - the kernel read bridge (`planner` <-> `kernel_scan` are mutually dependent and ship together): schema resolution, column-mapping, row-tracking, transform. - `dv_reader.rs` - Delta deletion-vector decode (inline + on-disk roaring bitmaps), surfaced as a DataFusion filter; missing-DV-file maps to SparkError::FileNotFound for parity. - `lib.rs` - re-adds the `dv_reader`/`kernel_scan` module decls and the `DeltaScan`/`DeltaScanCommon` proto re-exports trimmed in 3a. - `Cargo.toml` - re-adds the executor deps (parquet, roaring, datafusion-datasource, futures, chrono*, comet-common, tokio dev-dep) deferred from 3a. Core is untouched -- the dispatch shim is unchanged; it now reaches the real planner instead of the stub. The native crate is now equivalent to the integration branch (modulo the crate version, kept at 0.18.0, and a clarified planner doc-link). Default builds still carry zero Delta surface. Verification: gated native build, 89 in-crate unit tests (54 driver + 35 executor), default native build unchanged, clippy (both feature states), gate-verify script (contrib libcomet +13 MB), cargo fmt -- all green. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…lit, part 4a] Part 4a of the Delta Lake contrib PR breakup (tracking: apache#4366). The JVM claim/decline layer: `DeltaScanRule` recognises a V1 Delta scan and plants a `CometDeltaScanMarker` wrapping the original scan. This "activates" the reflective path A.2 already wired (`DeltaIntegration` looks up `DeltaScanRule$`), but stays INERT end to end: with no serde yet (`CometDeltaNativeScan`, part 4b), `CometExecRule`'s `scanHandler` lookup returns None, so the marker is left in the plan and executes as a vanilla Delta fallback. Net behavior on this build: Delta reads still run on vanilla Spark. - `DeltaScanRule.scala` - claim/decline rule (declines input_file_name, encryption, etc.; plants the marker otherwise). - `CometDeltaScanMarker.scala` - leaf exec node wrapping the original `FileSourceScanExec`; `doExecute` delegates to it (the vanilla fallback). FQN matches A.2's `DeltaIntegration. MarkerClass` string. - `DeltaScanMetadata.scala` - planning info carried on the marker; now also home to the `ScanImpl` constant (moved off the not-yet-present serde so the rule can name it). - `DeltaReflection.scala` - reflective Delta accessors (CDF members inert until part 5). - `RowTrackingAugmentedFileIndex.scala` - row-tracking file index used by the rule. - Tests: `CometDeltaTestBase` (trimmed of the serde/exec-dependent native-read helpers, which move to 4b; added marker-claim helpers) + a new `CometDeltaMarkerSuite` asserting the marker is planted on a plain read, the fallback is result-correct, and a declined projection plants no marker. - `dev/ci/check-suites.py` - exempt contrib test suites (they run under the dedicated Delta workflow, not the default pr_build matrix). No core / A.2 edits needed -- the reflective wiring already reaches this rule + marker. Required edit per the split plan: `CometDeltaNativeScan.ScanImpl` -> `DeltaScanMetadata.ScanImpl` (4b re-points the serde at it). Verification: gated JVM test-compile, `CometDeltaMarkerSuite` 3/3 (marker planted is red on the A.2 build / green here), check-suites, spotless + scalastyle, gate-verify (default build still 0 Delta symbols, only the DeltaIntegration bridge class) -- all green. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
apache#4535 main, review round 2 clean
…); apache#4366 sync deferred to end-of-cycle
…flaky re-run) + apache#4366 carveout links
…on table + 411-line pre-refactor tech-ref → docs pointer)
📋 Tracking PR — complete Delta contrib work product (being split for review)
Split sequence & status
Legend: 📋 not started · 🔨 in progress · 🔎 in review · ✅ merged
CometScanWithPlanDatatrait, leaf-scan match, DPP rewrite, reflective injector slot)mainDeltaIntegrationbridge, rule hooks, gate script + CI, stub crate)error/engine/predicate/scan/jni.rs, 54 unit tests)dv_reader/kernel_scan/planner.rs, 35 unit tests)DeltaConf/DeltaReflection/DeltaScanMetadata/CometDeltaScanMarker/RowTrackingAugmentedFileIndex/DeltaScanRule)CometDeltaNativeScanserde,Native.scala, exec node,DeltaPlanDataInjector+ suites)CometDeltaCdfScanExec,CometExecRuleCDF hook, CDC suites)delta_contrib_test.yml,check-suites.py)dev/diffs/*,run-regression.sh,run-test.sh, workflow)contrib/delta/docs/*, user-guide pages)FAILED_READ_FILEparity for DeltaCore-change PRs extracted from this work (land independently)
Small, self-contained core/shuffle fixes carved out of the monolith so the core touchpoints
are reviewed on their own rather than buried in the Delta diff. All but one have merged (
#4532remains open); they are independent core fixes, not front-of-cycle blockers for the split.
GetStructFieldnull handling for null parent structget_string(lossy decode)object_store-unsupported FS schemesConstantColumnVectoron serialize/export pathsCreateArraywith struct-nullability-divergent childrenPlanDataInjectorlookup by op kindFAILED_READ_FILEArchitecture & validation — technical reference
Current architecture (kernel-read). Each Delta data file is read through
delta-kernel-rs0.24, which shares Comet's arrow-58 so there is no Arrow bridge. The driver resolves the snapshot + per-file list via kernel and ships a typedOpStruct::DeltaScanproto to executors;DeltaKernelScanExecreads each file through kernel's own read + physical→logical transform (column mapping incl. nested, partition-value injection, deletion-vector masking) and synthesizes Delta's virtual columns (__delta_internal_row_index,__delta_internal_is_row_deleted,row_id,row_commit_version,_metadata.*) in-worker on that same path. Change Data Feed (readChangeFeed) is read natively via kernel'sTableChanges, split across multiple Spark partitions. The contrib is gated behind-Pcontrib-delta(Maven) /--features contrib-delta(Cargo); default builds carry zero Delta surface.Authoritative, maintained design docs ship with the split (part 8, schenksj#12) under
contrib/delta/docs/:01-overview(start here),02-planning(Scala planning rule + proto serde),03-native-execution(Rust execution plan),04-design-decisions(the "why"),05-build-and-deploy,06-fallback-and-ops,08-known-limitations(deliberate tradeoffs + tracked issues), plusarchive/(the kernel-read migration plan and its coherence / elimination audits). The user-facing guide isdocs/source/user-guide/latest/delta.md.Each carveout PR carries its own focused description of exactly what it changes — part 1 = #4700; parts 2–9 = schenksj#4 → #13 (see the table at the top).
Validation.
dev/verify-contrib-delta-gate.sh..github/workflows/delta_contrib_test.yml(part 6, schenksj#10)..github/workflows/delta_regression_test.yml/ locallycontrib/delta/dev/run-regression.sh <delta-version> <filter>(part 7, schenksj#11).Upstream issue. apache/datafusion#22366 —
make_arrayelement-type strictness; theCometCreateArraydecline (#4533) is a caller-side workaround until upstream relaxes.